package scalapackage.testspark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
  * Created by Germmy on 2018/5/9.
  */
object SparkRddTest {


  def main(args: Array[String]) {
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkRdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd1: RDD[Int] = sc.parallelize(List(4,5,1,9,10,8,7,6))

    //1.将rdd里面的值都*2,并排序
    val rdd2: RDD[Int] = rdd1.map(_*2)

    rdd1.map(x=>x*3)//牛逼，终于理解了将函数传给函数的奥义了。x=>x*3就是一个函数，x是入参，x*3是方法体；而_*2也是一个函数，入参默认是流中的每一个值，_*2是方法体
//    println(rdd2.collect().toBuffer)

    val rdd3: RDD[Int] = rdd2.sortBy(x=>x,true)//用x=>x
//    println(rdd3.collect().toBuffer)

    //过滤出大于等于10的元素
    val rdd4: RDD[Int] = rdd3.filter(_>=10)
//    println(rdd4.collect().toBuffer)

    //将以下元素先切分再压平
    val rdd5: RDD[String] = sc.parallelize(Array("a,b,c","d e f","h i j"))

    val rdd6: RDD[String] = rdd5.flatMap(_.split(" "))

    //来一个复杂的
    val rdd7: RDD[List[String]] = sc.parallelize(List(List("a b c","a b b"),List("d e f","a b b"),List("h i j","a b b")))

    val rdd8: RDD[String] = rdd7.flatMap(_.flatMap(_.split(" ")))

//    println(rdd8.collect().toBuffer)

    val rdd9: RDD[Int] = sc.parallelize(List(5,6,4,3))
    val rdd10: RDD[Int] = sc.parallelize(List(1,2,3,4))

    //求并集
    val rdd11: RDD[Int] = rdd9.union(rdd10)
//    println("并集为:"+rdd11.collect().toBuffer)

    //求交集
    val rdd12: RDD[Int] = rdd9.intersection(rdd10)
//    println("交集为:"+rdd12.collect().toBuffer)

    //去重
    val rdd13: RDD[Int] = rdd11.distinct()
//    println("去重为:"+rdd13.collect().toBuffer)

    val rdd14: RDD[(String, Int)] = sc.parallelize(List(("tom",1),("tom",2),("jerry",3),("kitty",2)))
    val rdd15: RDD[(String, Int)] = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))

    //求join
    val rdd16: RDD[(String, (Int, Int))] = rdd14.join(rdd15)
//    println("join为:"+rdd16.collect().toBuffer)

    //左连接
    val rdd17: RDD[(String, (Int, Option[Int]))] = rdd14.leftOuterJoin(rdd15)
//    println("left join为:"+rdd17.collect().toBuffer)

    //右连接
    val rdd18: RDD[(String, (Option[Int], Int))] = rdd14.rightOuterJoin(rdd15)
//    println("right join为:"+rdd18.collect().toBuffer)

    //求并集
    val rdd19: RDD[(String, Int)] = rdd14.union(rdd15)

//    println(rdd19.collect().toBuffer)


    //按key进行分组
    val rdd20: mutable.Buffer[(String, Iterable[Int])] = rdd19.groupByKey().collect().toBuffer//和原文比对过，正确
//    println(rdd20)

    //分别用reduceByKey、groupByKey wordcount
    //1.用groupByKey wordcount
    val rdd21: mutable.Buffer[(String, Int)] = rdd19.groupByKey().mapValues(_.sum).collect().toBuffer//牛逼
//    println(rdd21)

    //2.用reduceByKey
    val rdd22: mutable.Buffer[(String, Int)] = rdd19.reduceByKey(_+_).collect().toBuffer
//    println(rdd22)

    //用cogroup,注意与groupByKey的区别
    val rdd23: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd14.cogroup(rdd15)
    println(rdd23.collect().toBuffer)

    val rdd24: RDD[(String, Int)] = sc.parallelize(List(("tom",1),("tom",2),("jerry",3),("kitty",2)))
    val rdd25: RDD[(String, Int)] = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
    val rdd26: RDD[(String, Int)] = rdd24.union(rdd25)
    //按key进行聚合,用reduceByKey即可
    val rdd27: mutable.Buffer[(String, Int)] = rdd26.reduceByKey(_+_).collect().toBuffer
//    println(rdd27)

    //用value降序排列
    //强行构造sortByKey实现
     val rdd28: mutable.Buffer[(String, Int)] = rdd26.reduceByKey(_+_).map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1)).collect().toBuffer
//    println(rdd28)

    //用sortBy实现
    val rdd29: mutable.Buffer[(String, Int)] = rdd26.reduceByKey(_+_).sortBy(_._2,false).collect().toBuffer//事实上面那个方法纯属装逼
//    println(rdd29)

    //笛卡尔积
    val rdd30: RDD[((String, Int), (String, Int))] = rdd24.cartesian(rdd25)
    println(rdd30.collect().toBuffer)

    //其他:count,top,take,first,takeOrdered

  }


}
